package pers.vic.boot.util.thread;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.collections4.ListUtils;

/**
 * @description:多线程批量处理 ☆☆☆
 *                      如果要用集合收集处理结果，切记使用线程安全的集合，如CopyOnWriteArrayList，ConcurrentHashMap等☆☆☆
 *                      使用方法：BatchExecteOperation.init([int
 *                      partitionSize]).execute(List batchList,
 *                      BatchExecteOperationCallback callback).shutdown();
 * @author: Vic.xu
 * @date: 2020年3月11日 上午10:32:25
 */
public class BatchExecteOperation {

	private ThreadPoolExecutor threadPoolExecutor;

	/**
	 * list分割的默认大小
	 */
	private int partitionSize = 40;
	/**
	 * 是否是因异常终止
	 */
	private volatile boolean terminationByException = false;
	/**
	 * 因异常终止的信息
	 */
	private volatile String terminationMsg ="";

	private BatchExecteOperation() {
	}

	/**
	 * 
	 * @description: 初始化批量执行操作， 使用默认的分片size
	 * @author: Vic.xu
	 * @date: 2020年3月11日 下午1:00:59
	 * @return
	 */
	public static BatchExecteOperation init() {

		return init(DEFAULT_PARTITION_SIZE);
	}

	/**
	 * 
	 * @description: 初始化批量执行操作
	 * @author: Vic.xu
	 * @date: 2020年3月11日 上午11:23:47
	 * @param partitionSize list数据需要分割的默认大小(每个部分的数据大小)
	 * @return
	 */
	public static BatchExecteOperation init(int partitionSize) {
		BatchExecteOperation operation = new BatchExecteOperation();
		operation.partitionSize = partitionSize;
		operation.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
		return operation;
	}

	/**
	 * 
	 * @description: 执行批量操作，多线程执行完毕会自动关闭；
	 * @author: Vic.xu
	 * @date: 2020年3月11日 下午12:42:46
	 * @param batchList 需要批量操作的对象list，会根据partitionSize切分成n个list， 然后启动n个线程同步执行
	 * @param callback  ☆☆☆对切片后的list进行处理，如果要用集合收集处理结果，切记使用线程安全的集合，如CopyOnWriteArrayList，ConcurrentHashMap等
	 * @return
	 */
	@SuppressWarnings({ "rawtypes", "unchecked" })
	public BatchExecteOperation execute(List batchList, BatchExecteOperationCallback callback) {
		// 切片成lists.size()个 并启动size 个线程
		List<List<?>> lists = ListUtils.partition(batchList, partitionSize);
		System.out.println("切片个数 ：" + lists.size());
		lists.forEach(partition -> {
			threadPoolExecutor.execute(new Runnable() {
				@Override
				public void run() {
					if(terminationByException) {
						return;
					}
					try {
						callback.callback(partition);
					}catch (Exception e) {
						/**
						 * 标记线程异常 并记录异常MSG,另终止线程
						 * 在shutdown方法中抛出异常
						 */
						terminationByException = true;
						terminationMsg = e.getMessage();
						threadPoolExecutor.shutdown();
						
					}
					
				}

			});
		});
		threadPoolExecutor.shutdown();
		return this;
	}

	/**
	 * 
	 * @description: 阻塞，等待线程池中的线程全部执行完毕
	 * @author: Vic.xu
	 * @date: 2020年3月11日 下午12:43:15
	 * @return
	 * @throws InterruptedException
	 */
	public boolean shutdown() throws Exception {
		
		
		if (!threadPoolExecutor.isShutdown()) {
			threadPoolExecutor.shutdown();
		}
		//如果线程没有终止，并且也没有异常则一直阻塞
		while (!threadPoolExecutor.awaitTermination(2, TimeUnit.SECONDS) && !terminationByException);
		
		if(terminationByException) {
			throw new RuntimeException(terminationMsg);
		}
		
		return true;
	}

	/**
	 * 测试使用
	 */
	public static void main(String[] args) throws InterruptedException {
		List<Integer> list = Arrays.asList("1,2,3,4,5,6,7,8,9".split(",")).stream().map(Integer::valueOf)
				.collect(Collectors.toList());

		List<Integer> result = new CopyOnWriteArrayList<Integer>();
		long t1 = System.currentTimeMillis();
		System.out.println(t1);
		try {

			BatchExecteOperation.init(3).execute(list, new BatchExecteOperationCallback<Integer>() {
				@Override
				public void callback(List<Integer> partition) {
					partition.forEach(a -> {
						try {
							TimeUnit.SECONDS.sleep(1);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						if (a == 5) {
							System.out.println("a=" + a);
							throw new RuntimeException("5555555555555555");
						}
						System.out.println("add to result-->" + a);
						
						result.add(a * 2);
					});
				}

			}).shutdown();
			
		} catch (Exception e) {
			System.err.println("main" + e.getMessage());
			throw new RuntimeException(e.getMessage());
		}
		
		long t2 = System.currentTimeMillis();
		System.out.println(t2);
		System.out.println("耗时：" + ((t2 - t1) / 1000) + "s");
		System.out.println(result.size());
		result.forEach(a -> {
			System.out.print(a + "\t");
		});
	}
	
	private static int corePoolSize = 4;

	private static int maximumPoolSize = 4;

	// 单位秒
	private static int keepAliveTime = 30;
	
	/**
	 * list分割的默认大小
	 */
	private static final int DEFAULT_PARTITION_SIZE = 40;
	
	static {
		corePoolSize = Runtime.getRuntime().availableProcessors();

		maximumPoolSize = corePoolSize;
	}

}
